/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.engine.meta.crawler.infer

import com.chinamobile.cmss.lakehouse.engine.meta.crawler.FileSystemHolder
import com.chinamobile.cmss.lakehouse.engine.meta.crawler.model._
import com.typesafe.config.Config
import io.circe.parser._
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.OriginalType
import org.slf4j.LoggerFactory
import org.w3c.dom.{Node => XmlNode}

import java.io.{BufferedReader, InputStreamReader}
import javax.xml.parsers.{DocumentBuilder, DocumentBuilderFactory}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success, Try}


trait SchemaInfer {
  def infer(path: Path): Try[Schema]
}

object SchemaInfer {

  def apply(jobConfig: Config): SchemaInfer = {
    val resolveStr = jobConfig.getString("source.resolver")
    val fileSystemHolder = FileSystemHolder(jobConfig)
    resolveStr match {
      case "parquet" =>
        val parquetInferConfig = ParquetInferConfig(fileSystemHolder, jobConfig)
        new ParquetInfer(parquetInferConfig)
    }

  }
}

trait InferConfig {
  def fileSystemHolder: FileSystemHolder

  def jobConfig: Config
}

case class DirectoryInfer(jobConfig: Config) extends SchemaInfer {
  val fileSystemHolder = FileSystemHolder(jobConfig)
  val infer = SchemaInfer(jobConfig)
  private val logger = LoggerFactory.getLogger(getClass)

  override def infer(path: Path): Try[Schema] = {
    val inferred =
      fileSystemHolder.fs.listStatus(path).map(f => infer.infer(f.getPath))
    val head = inferred.head
    val error = inferred.find(_.isFailure)
    if (error.isDefined) {
      error
        .map(_.failed)
        .foreach(_.foreach(e => {
          logger
            .warn(s"path: ${path.toString} infer schema failure due to: ", e)
        }))
      //throw Failure when find metadata failed
      Failure(new RuntimeException(s"Failed to find metadata，failed to find schema"))
    } else if (inferred.map(_.map(m => m.toSchemaKey)).exists(_ != head.map(hm => hm.toSchemaKey))) {
      logger.warn(
        s"path: ${path.toString} infer schema failure different schema"
      )
      Failure(new RuntimeException("Failed to find metadata! Files under the path have different schema"))
    } else {
      head.map(_.copy(path = path.toString))
    }
  }
}

case class ParquetInferConfig(
                               fileSystemHolder: FileSystemHolder,
                               jobConfig: Config
                             ) extends InferConfig

class ParquetInfer(config: ParquetInferConfig) extends SchemaInfer {
  override def infer(path: Path): Try[Schema] = {
    try {
      val readFooter =
        ParquetFileReader
          .open(
            HadoopInputFile
              .fromPath(path, config.fileSystemHolder.hadoopConfiguration)
          )
      val schema = readFooter.getFileMetaData.getSchema
      val types = schema.getFields.asScala
      Success {
        val columnList = types.map { t =>
          val typeName = t.asPrimitiveType().getPrimitiveTypeName().getMethod
          val columnType = typeName match {
            case "getBoolean" => BooleanType
            case "getInteger" => IntType
            case "getLong" => BigIntType
            case "getFloat" => FloatType
            case "getDouble" => DoubleType
            case "getBinary" =>
              t.asPrimitiveType().getPrimitiveTypeName().name() match {
                case "FIXED_LEN_BYTE_ARRAY" =>
                  val decimalMeta = t.asPrimitiveType().getDecimalMetadata
                  val scala = decimalMeta.getScale
                  val precision = decimalMeta.getPrecision
                  DecimalType(precision, scala)
                case "INT96" => TimeStampType
                case _
                  if t.getOriginalType != null && t.getOriginalType.equals(
                    OriginalType.UTF8
                  ) =>
                  StringType
                case _ => BinaryType
              }
            case _ => StringType
          }
          ColumnInfo(t.getName, columnType)
        }.toList
        Schema.simple(
          "",
          path.toString,
          columnList,
          StorageFormat.DEFAULT_PARQUET_FORMAT,
          Map.empty
        )
      }
    } catch {
      case e: Throwable => Failure[Schema](e)
    }
  }
}

